// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

export module infinity_core:physical_limit;

import :query_context;
import :operator_state;
import :physical_operator;
import :physical_operator_type;
import :base_expression;
import :value_expression;
import :data_table;
import :load_meta;
import :infinity_exception;

import internal_types;
import data_type;

namespace infinity {

struct DataBlock;

export class LimitCounter {
public:
    virtual ~LimitCounter() = default;

    // Returns the left index after offset
    virtual size_t Offset(size_t row_count) = 0;

    // Returns the right index after limit
    virtual size_t Limit(size_t row_count) = 0;

    virtual bool IsLimitOver() = 0;

    size_t TotalHitsCount() const { return total_hits_count_; }

    void AddHitsCount(u64 row_count);

private:
    std::atomic<u64> total_hits_count_{};
};

export class AtomicCounter final : public LimitCounter {
public:
    AtomicCounter(i64 offset, i64 limit) : offset_(offset), limit_(limit) {}

    ~AtomicCounter() final = default;

    size_t Offset(size_t row_count) final;

    size_t Limit(size_t row_count) final;

    bool IsLimitOver();

private:
    std::atomic_int64_t offset_{};
    std::atomic_int64_t limit_{};
};

export class UnSyncCounter final : public LimitCounter {
public:
    UnSyncCounter(i64 offset, i64 limit) : offset_(offset), limit_(limit) {}

    ~UnSyncCounter() final = default;

    size_t Offset(size_t row_count) final;

    size_t Limit(size_t row_count) final;

    bool IsLimitOver();

private:
    i64 offset_{};
    std::atomic<i64> limit_{};
};

export class PhysicalLimit final : public PhysicalOperator {
public:
    explicit PhysicalLimit(u64 id,
                           std::unique_ptr<PhysicalOperator> left,
                           std::shared_ptr<BaseExpression> limit_expr,
                           std::shared_ptr<BaseExpression> offset_expr,
                           std::shared_ptr<std::vector<LoadMeta>> load_metas,
                           bool total_hits_count_flag);

    ~PhysicalLimit() final = default;

    void Init(QueryContext *query_context) final;

    static bool Execute(QueryContext *query_context,
                        const std::vector<std::unique_ptr<DataBlock>> &input_blocks,
                        std::vector<std::unique_ptr<DataBlock>> &output_blocks,
                        LimitCounter *counter,
                        bool total_hits_count_flag);

    bool Execute(QueryContext *query_context, OperatorState *operator_state) final;

    [[nodiscard]] inline std::shared_ptr<std::vector<std::string>> GetOutputNames() const final { return left_->GetOutputNames(); }

    [[nodiscard]] inline std::shared_ptr<std::vector<std::shared_ptr<DataType>>> GetOutputTypes() const final { return left_->GetOutputTypes(); }

    size_t TaskletCount() override { return left_->TaskletCount(); }

    [[nodiscard]] inline const std::shared_ptr<BaseExpression> &limit_expr() const { return limit_expr_; }

    [[nodiscard]] inline const std::shared_ptr<BaseExpression> &offset_expr() const { return offset_expr_; }

private:
    std::shared_ptr<BaseExpression> limit_expr_{};
    std::shared_ptr<BaseExpression> offset_expr_{};

    std::unique_ptr<LimitCounter> counter_{};
    bool total_hits_count_flag_{};
};

} // namespace infinity
